import numpy as np
import pandas as pd
import pytest

from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import CandleType
from freqtrade.resolvers.strategy_resolver import StrategyResolver
from freqtrade.strategy import merge_informative_pair, stoploss_from_absolute, stoploss_from_open
from tests.conftest import generate_test_data, get_patched_exchange


def test_merge_informative_pair():
    data = generate_test_data("15m", 40)
    informative = generate_test_data("1h", 40)
    cols_inf = list(informative.columns)

    result = merge_informative_pair(data, informative, "15m", "1h", ffill=True)
    assert isinstance(result, pd.DataFrame)
    assert list(informative.columns) == cols_inf
    assert len(result) == len(data)
    assert "date" in result.columns
    assert result["date"].equals(data["date"])
    assert "date_1h" in result.columns

    assert "open" in result.columns
    assert "open_1h" in result.columns
    assert result["open"].equals(data["open"])

    assert "close" in result.columns
    assert "close_1h" in result.columns
    assert result["close"].equals(data["close"])

    assert "volume" in result.columns
    assert "volume_1h" in result.columns
    assert result["volume"].equals(data["volume"])

    # First 3 rows are empty.
    # Pre-fillup doesn't happen as there is no prior candlw in the informative dataframe
    assert result.iloc[0]["date_1h"] is pd.NaT
    assert result.iloc[1]["date_1h"] is pd.NaT
    assert result.iloc[2]["date_1h"] is pd.NaT
    # Next 4 rows contain the starting date (0:00)
    assert result.iloc[3]["date_1h"] == result.iloc[0]["date"]
    assert result.iloc[4]["date_1h"] == result.iloc[0]["date"]
    assert result.iloc[5]["date_1h"] == result.iloc[0]["date"]
    assert result.iloc[6]["date_1h"] == result.iloc[0]["date"]
    # Next 4 rows contain the next Hourly date original date row 4
    assert result.iloc[7]["date_1h"] == result.iloc[4]["date"]
    assert result.iloc[8]["date_1h"] == result.iloc[4]["date"]

    informative = generate_test_data("1h", 40)
    result = merge_informative_pair(data, informative, "15m", "1h", ffill=False)
    # First 3 rows are empty
    assert result.iloc[0]["date_1h"] is pd.NaT
    assert result.iloc[1]["date_1h"] is pd.NaT
    assert result.iloc[2]["date_1h"] is pd.NaT
    # Next 4 rows contain the starting date (0:00)
    assert result.iloc[3]["date_1h"] == result.iloc[0]["date"]
    assert result.iloc[4]["date_1h"] is pd.NaT
    assert result.iloc[5]["date_1h"] is pd.NaT
    assert result.iloc[6]["date_1h"] is pd.NaT
    # Next 4 rows contain the next Hourly date original date row 4
    assert result.iloc[7]["date_1h"] == result.iloc[4]["date"]
    assert result.iloc[8]["date_1h"] is pd.NaT


def test_merge_informative_pair_weekly():
    # Covers roughly 2 months - until 2023-01-10
    data = generate_test_data("1h", 1040, "2022-11-28")
    informative = generate_test_data("1w", 40, "2022-11-01")
    informative["day"] = informative["date"].dt.day_name()

    result = merge_informative_pair(data, informative, "1h", "1w", ffill=True)
    assert isinstance(result, pd.DataFrame)
    # 2022-12-24 is a Saturday
    candle1 = result.loc[(result["date"] == "2022-12-24T22:00:00.000Z")]
    assert candle1.iloc[0]["date"] == pd.Timestamp("2022-12-24T22:00:00.000Z")
    assert candle1.iloc[0]["date_1w"] == pd.Timestamp("2022-12-12T00:00:00.000Z")

    candle2 = result.loc[(result["date"] == "2022-12-24T23:00:00.000Z")]
    assert candle2.iloc[0]["date"] == pd.Timestamp("2022-12-24T23:00:00.000Z")
    assert candle2.iloc[0]["date_1w"] == pd.Timestamp("2022-12-12T00:00:00.000Z")

    # 2022-12-25 is a Sunday
    candle3 = result.loc[(result["date"] == "2022-12-25T22:00:00.000Z")]
    assert candle3.iloc[0]["date"] == pd.Timestamp("2022-12-25T22:00:00.000Z")
    # Still old candle
    assert candle3.iloc[0]["date_1w"] == pd.Timestamp("2022-12-12T00:00:00.000Z")

    candle4 = result.loc[(result["date"] == "2022-12-25T23:00:00.000Z")]
    assert candle4.iloc[0]["date"] == pd.Timestamp("2022-12-25T23:00:00.000Z")
    assert candle4.iloc[0]["date_1w"] == pd.Timestamp("2022-12-19T00:00:00.000Z")


def test_merge_informative_pair_monthly():
    # Covers roughly 2 months - until 2023-01-10
    data = generate_test_data("1h", 1040, "2022-11-28")
    informative = generate_test_data("1M", 40, "2022-01-01")

    result = merge_informative_pair(data, informative, "1h", "1M", ffill=True)
    assert isinstance(result, pd.DataFrame)
    candle1 = result.loc[(result["date"] == "2022-12-31T22:00:00.000Z")]
    assert candle1.iloc[0]["date"] == pd.Timestamp("2022-12-31T22:00:00.000Z")
    assert candle1.iloc[0]["date_1M"] == pd.Timestamp("2022-11-01T00:00:00.000Z")

    candle2 = result.loc[(result["date"] == "2022-12-31T23:00:00.000Z")]
    assert candle2.iloc[0]["date"] == pd.Timestamp("2022-12-31T23:00:00.000Z")
    assert candle2.iloc[0]["date_1M"] == pd.Timestamp("2022-12-01T00:00:00.000Z")

    # Candle is empty, as the start-date did fail.
    candle3 = result.loc[(result["date"] == "2022-11-30T22:00:00.000Z")]
    assert candle3.iloc[0]["date"] == pd.Timestamp("2022-11-30T22:00:00.000Z")
    # Merged on prior month
    assert candle3.iloc[0]["date_1M"] == pd.Timestamp("2022-10-01T00:00:00.000Z")

    # First candle with 1M data merged.
    candle4 = result.loc[(result["date"] == "2022-11-30T23:00:00.000Z")]
    assert candle4.iloc[0]["date"] == pd.Timestamp("2022-11-30T23:00:00.000Z")
    assert candle4.iloc[0]["date_1M"] == pd.Timestamp("2022-11-01T00:00:00.000Z")

    # Very first candle in the result dataframe
    # Merged the latest informative candle before the start-date
    candle5 = result.iloc[0]
    assert candle5["date"] == pd.Timestamp("2022-11-28T00:00:00.000Z")
    assert candle5["date_1M"] == pd.Timestamp("2022-10-01T00:00:00.000Z")


def test_merge_informative_pair_no_overlap():
    # Covers roughly a day
    data = generate_test_data("1m", 1440, "2022-11-28")
    # Data stops WAY before the main data starts
    informative = generate_test_data("1h", 40, "2022-11-01")

    result = merge_informative_pair(data, informative, "1m", "1h", ffill=True)

    assert isinstance(result, pd.DataFrame)
    assert len(result) == len(data)
    assert "date" in result.columns
    assert result["date"].equals(data["date"])
    assert "date_1h" in result.columns
    # If there's no overlap, forward filling should not fill anything
    assert result["date_1h"].isnull().all()


def test_merge_informative_pair_same():
    data = generate_test_data("15m", 40)
    informative = generate_test_data("15m", 40)

    result = merge_informative_pair(data, informative, "15m", "15m", ffill=True)
    assert isinstance(result, pd.DataFrame)
    assert len(result) == len(data)
    assert "date" in result.columns
    assert result["date"].equals(data["date"])
    assert "date_15m" in result.columns

    assert "open" in result.columns
    assert "open_15m" in result.columns
    assert result["open"].equals(data["open"])

    assert "close" in result.columns
    assert "close_15m" in result.columns
    assert result["close"].equals(data["close"])

    assert "volume" in result.columns
    assert "volume_15m" in result.columns
    assert result["volume"].equals(data["volume"])

    # Dates match 1:1
    assert result["date_15m"].equals(result["date"])


def test_merge_informative_pair_lower():
    data = generate_test_data("1h", 40)
    informative = generate_test_data("15m", 40)

    with pytest.raises(ValueError, match=r"Tried to merge a faster timeframe .*"):
        merge_informative_pair(data, informative, "1h", "15m", ffill=True)


def test_merge_informative_pair_empty():
    data = generate_test_data("1h", 40)
    informative = pd.DataFrame(columns=data.columns)

    result = merge_informative_pair(data, informative, "1h", "2h", ffill=True)
    assert result["date"].equals(data["date"])

    assert list(result.columns) == [
        "date",
        "open",
        "high",
        "low",
        "close",
        "volume",
        "date_2h",
        "open_2h",
        "high_2h",
        "low_2h",
        "close_2h",
        "volume_2h",
    ]
    # We merge an empty dataframe, so all values should be NaN
    for col in ["date_2h", "open_2h", "high_2h", "low_2h", "close_2h", "volume_2h"]:
        assert result[col].isnull().all()


def test_merge_informative_pair_suffix():
    data = generate_test_data("15m", 20)
    informative = generate_test_data("1h", 20)

    result = merge_informative_pair(
        data, informative, "15m", "1h", append_timeframe=False, suffix="suf"
    )

    assert "date" in result.columns
    assert result["date"].equals(data["date"])
    assert "date_suf" in result.columns

    assert "open_suf" in result.columns
    assert "open_1h" not in result.columns

    assert list(result.columns) == [
        "date",
        "open",
        "high",
        "low",
        "close",
        "volume",
        "date_suf",
        "open_suf",
        "high_suf",
        "low_suf",
        "close_suf",
        "volume_suf",
    ]


def test_merge_informative_pair_suffix_append_timeframe():
    data = generate_test_data("15m", 20)
    informative = generate_test_data("1h", 20)

    with pytest.raises(ValueError, match=r"You can not specify `append_timeframe` .*"):
        merge_informative_pair(data, informative, "15m", "1h", suffix="suf")


@pytest.mark.parametrize(
    "side,profitrange",
    [
        # profit range for long is [-1, inf] while for shorts is [-inf, 1]
        ("long", [-0.99, 2, 30]),
        ("short", [-2.0, 0.99, 30]),
    ],
)
def test_stoploss_from_open(side, profitrange):
    open_price_ranges = [
        [0.01, 1.00, 30],
        [1, 100, 30],
        [100, 10000, 30],
    ]

    for open_range in open_price_ranges:
        for open_price in np.linspace(*open_range):
            for desired_stop in np.linspace(-0.50, 0.50, 30):
                if side == "long":
                    # -1 is not a valid current_profit, should return 1
                    assert stoploss_from_open(desired_stop, -1) == 1
                else:
                    # 1 is not a valid current_profit for shorts, should return 1
                    assert stoploss_from_open(desired_stop, 1, True) == 1

                for current_profit in np.linspace(*profitrange):
                    if side == "long":
                        current_price = open_price * (1 + current_profit)
                        expected_stop_price = open_price * (1 + desired_stop)
                        stoploss = stoploss_from_open(desired_stop, current_profit)
                        stop_price = current_price * (1 - stoploss)
                    else:
                        current_price = open_price * (1 - current_profit)
                        expected_stop_price = open_price * (1 - desired_stop)
                        stoploss = stoploss_from_open(desired_stop, current_profit, True)
                        stop_price = current_price * (1 + stoploss)

                    assert stoploss >= 0
                    # Technically the formula can yield values greater than 1 for shorts
                    # even though it doesn't make sense because the position would be liquidated
                    if side == "long":
                        assert stoploss <= 1

                    # there is no correct answer if the expected stop price is above
                    # the current price
                    if (side == "long" and expected_stop_price > current_price) or (
                        side == "short" and expected_stop_price < current_price
                    ):
                        assert stoploss == 0
                    else:
                        assert pytest.approx(stop_price) == expected_stop_price


@pytest.mark.parametrize(
    "side,rel_stop,curr_profit,leverage,expected",
    [
        # profit range for long is [-1, inf] while for shorts is [-inf, 1]
        ("long", 0, -1, 1, 1),
        ("long", 0, 0.1, 1, 0.09090909),
        ("long", -0.1, 0.1, 1, 0.18181818),
        ("long", 0.1, 0.2, 1, 0.08333333),
        ("long", 0.1, 0.5, 1, 0.266666666),
        ("long", 0.1, 5, 1, 0.816666666),  # 500% profit, set stoploss to 10% above open price
        ("long", 0, 5, 10, 3.3333333),  # 500% profit, set stoploss break even
        ("long", 0.1, 5, 10, 3.26666666),  # 500% profit, set stoploss to 10% above open price
        ("long", -0.1, 5, 10, 3.3999999),  # 500% profit, set stoploss to 10% belowopen price
        ("short", 0, 0.1, 1, 0.1111111),
        ("short", -0.1, 0.1, 1, 0.2222222),
        ("short", 0.1, 0.2, 1, 0.125),
        ("short", 0.1, 1, 1, 1),
        ("short", -0.01, 5, 10, 10.01999999),  # 500% profit at 10x
    ],
)
def test_stoploss_from_open_leverage(side, rel_stop, curr_profit, leverage, expected):
    stoploss = stoploss_from_open(rel_stop, curr_profit, side == "short", leverage)
    assert pytest.approx(stoploss) == expected
    open_rate = 100
    if stoploss != 1:
        if side == "long":
            current_rate = open_rate * (1 + curr_profit / leverage)
            stop = current_rate * (1 - stoploss / leverage)
            assert pytest.approx(stop) == open_rate * (1 + rel_stop / leverage)
        else:
            current_rate = open_rate * (1 - curr_profit / leverage)
            stop = current_rate * (1 + stoploss / leverage)
            assert pytest.approx(stop) == open_rate * (1 - rel_stop / leverage)


def test_stoploss_from_absolute():
    assert pytest.approx(stoploss_from_absolute(90, 100)) == 1 - (90 / 100)
    assert pytest.approx(stoploss_from_absolute(90, 100)) == 0.1
    assert pytest.approx(stoploss_from_absolute(95, 100)) == 0.05
    assert pytest.approx(stoploss_from_absolute(100, 100)) == 0
    assert pytest.approx(stoploss_from_absolute(110, 100)) == 0
    assert pytest.approx(stoploss_from_absolute(100, 0)) == 1
    assert pytest.approx(stoploss_from_absolute(0, 100)) == 1
    assert pytest.approx(stoploss_from_absolute(0, 100, False, leverage=5)) == 5

    assert pytest.approx(stoploss_from_absolute(90, 100, True)) == 0
    assert pytest.approx(stoploss_from_absolute(100, 100, True)) == 0
    assert pytest.approx(stoploss_from_absolute(110, 100, True)) == -(1 - (110 / 100))
    assert pytest.approx(stoploss_from_absolute(110, 100, True)) == 0.1
    assert pytest.approx(stoploss_from_absolute(105, 100, True)) == 0.05
    assert pytest.approx(stoploss_from_absolute(105, 100, True, 5)) == 0.05 * 5
    assert pytest.approx(stoploss_from_absolute(100, 0, True)) == 1
    assert pytest.approx(stoploss_from_absolute(0, 100, True)) == 0
    assert pytest.approx(stoploss_from_absolute(100, 1, is_short=True)) == 1
    assert pytest.approx(stoploss_from_absolute(100, 1, is_short=True, leverage=5)) == 5


@pytest.mark.parametrize("trading_mode", ["futures", "spot"])
def test_informative_decorator(mocker, default_conf_usdt, trading_mode):
    candle_def = CandleType.get_default(trading_mode)
    default_conf_usdt["candle_type_def"] = candle_def
    test_data_5m = generate_test_data("5m", 40)
    test_data_30m = generate_test_data("30m", 40)
    test_data_1h = generate_test_data("1h", 40)
    data = {
        ("XRP/USDT", "5m", candle_def): test_data_5m,
        ("XRP/USDT", "30m", candle_def): test_data_30m,
        ("XRP/USDT", "1h", candle_def): test_data_1h,
        ("XRP/BTC", "1h", candle_def): test_data_1h,  # from {base}/BTC
        ("LTC/USDT", "5m", candle_def): test_data_5m,
        ("LTC/USDT", "30m", candle_def): test_data_30m,
        ("LTC/USDT", "1h", candle_def): test_data_1h,
        ("LTC/BTC", "1h", candle_def): test_data_1h,  # from {base}/BTC
        ("NEO/USDT", "30m", candle_def): test_data_30m,
        ("NEO/USDT", "5m", CandleType.SPOT): test_data_5m,  # Explicit request with '' as candletype
        ("NEO/USDT", "15m", candle_def): test_data_5m,  # Explicit request with '' as candletype
        ("NEO/USDT", "1h", candle_def): test_data_1h,
        ("ETH/USDT", "1h", candle_def): test_data_1h,
        ("ETH/USDT", "30m", candle_def): test_data_30m,
        ("ETH/BTC", "1h", CandleType.SPOT): test_data_1h,  # Explicitly selected as spot
    }
    default_conf_usdt["strategy"] = "InformativeDecoratorTest"
    strategy = StrategyResolver.load_strategy(default_conf_usdt)
    exchange = get_patched_exchange(mocker, default_conf_usdt)
    default_conf_usdt["candle_type_def"] = candle_def
    strategy.dp = DataProvider({}, exchange, None)
    mocker.patch.object(
        strategy.dp, "current_whitelist", return_value=["XRP/USDT", "LTC/USDT", "NEO/USDT"]
    )

    assert len(strategy._ft_informative) == 7  # Equal to number of decorators used
    informative_pairs = [
        ("XRP/USDT", "1h", candle_def),
        ("XRP/BTC", "1h", candle_def),
        ("LTC/USDT", "1h", candle_def),
        ("LTC/BTC", "1h", candle_def),
        ("XRP/USDT", "30m", candle_def),
        ("LTC/USDT", "30m", candle_def),
        ("NEO/USDT", "1h", candle_def),
        ("NEO/USDT", "30m", candle_def),
        ("NEO/USDT", "5m", candle_def),
        ("NEO/USDT", "15m", candle_def),
        ("NEO/USDT", "2h", CandleType.FUTURES),
        ("ETH/BTC", "1h", CandleType.SPOT),  # One candle remains as spot
        ("ETH/USDT", "30m", candle_def),
    ]
    for inf_pair in informative_pairs:
        assert inf_pair in strategy.gather_informative_pairs()

    def test_historic_ohlcv(pair, timeframe, candle_type):
        return data.get(
            (pair, timeframe or strategy.timeframe, CandleType.from_string(candle_type)),
            pd.DataFrame(),
        ).copy()

    mocker.patch(
        "freqtrade.data.dataprovider.DataProvider.historic_ohlcv", side_effect=test_historic_ohlcv
    )

    analyzed = strategy.advise_all_indicators(
        {p: data[(p, strategy.timeframe, candle_def)] for p in ("XRP/USDT", "LTC/USDT")}
    )
    expected_columns = [
        "rsi_1h",
        "rsi_30m",  # Stacked informative decorators
        "neo_usdt_rsi_1h",  # NEO 1h informative
        "rsi_NEO_USDT_neo_usdt_NEO/USDT_30m",  # Column formatting
        "rsi_from_callable",  # Custom column formatter
        "eth_btc_rsi_1h",  # Quote currency not matching stake currency
        "rsi",
        "rsi_less",  # Non-informative columns
        "rsi_5m",  # Manual informative dataframe
    ]
    for _, dataframe in analyzed.items():
        for col in expected_columns:
            assert col in dataframe.columns

    # Test non-available pairs
    del data[("ETH/BTC", "1h", CandleType.SPOT)]
    with pytest.raises(
        ValueError, match=r"Informative dataframe for \(ETH\/BTC, 1h, spot\) is empty.*"
    ):
        strategy.advise_all_indicators(
            {p: data[(p, strategy.timeframe, candle_def)] for p in ("XRP/USDT", "LTC/USDT")}
        )
